// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <chrono>
#include <unordered_map>

#include <nebula/util/concurrent_map.h>
#include <nebula/future/mutex.h>

namespace parquet::encryption {
    using ::nebula::util::ConcurrentMap;

    namespace internal {
        using TimePoint =
        std::chrono::time_point<std::chrono::system_clock, std::chrono::duration<double> >;

        inline TimePoint CurrentTimePoint() { return std::chrono::system_clock::now(); }

        template<typename E>
        class ExpiringCacheEntry {
        public:
            ExpiringCacheEntry() = default;

            ExpiringCacheEntry(E cached_item, double expiration_interval_seconds)
                : expiration_timestamp_(CurrentTimePoint() +
                                        std::chrono::duration<double>(expiration_interval_seconds)),
                  cached_item_(std::move(cached_item)) {
            }

            bool IsExpired() const {
                const auto now = CurrentTimePoint();
                return (now > expiration_timestamp_);
            }

            E cached_item() { return cached_item_; }

        private:
            const TimePoint expiration_timestamp_;
            E cached_item_;
        };

        // This class is to avoid the below warning when compiling KeyToolkit class with VS2015
        // warning C4503: decorated name length exceeded, name was truncated
        template<typename V>
        class ExpiringCacheMapEntry {
        public:
            ExpiringCacheMapEntry() = default;

            explicit ExpiringCacheMapEntry(
                std::shared_ptr<ConcurrentMap<std::string, V> > cached_item,
                double expiration_interval_seconds)
                : map_cache_(cached_item, expiration_interval_seconds) {
            }

            bool IsExpired() { return map_cache_.IsExpired(); }

            std::shared_ptr<ConcurrentMap<std::string, V> > cached_item() {
                return map_cache_.cached_item();
            }

        private:
            // ConcurrentMap object may be accessed and modified at many places at the same time,
            // from multiple threads, or even removed from cache.
            ExpiringCacheEntry<std::shared_ptr<ConcurrentMap<std::string, V> > > map_cache_;
        };
    } // namespace internal

    // Two-level cache with expiration of internal caches according to token lifetime.
    // External cache is per token, internal is per string key.
    // Wrapper class around:
    //    std::unordered_map<std::string,
    //    internal::ExpiringCacheEntry<std::unordered_map<std::string, V>>>
    // This cache is safe to be shared between threads.
    template<typename V>
    class TwoLevelCacheWithExpiration {
    public:
        TwoLevelCacheWithExpiration() {
            last_cache_cleanup_timestamp_ = internal::CurrentTimePoint();
        }

        std::shared_ptr<ConcurrentMap<std::string, V> > GetOrCreateInternalCache(
            const std::string &access_token, double cache_entry_lifetime_seconds) {
            auto lock = mutex_.Lock();

            auto external_cache_entry = cache_.find(access_token);
            if (external_cache_entry == cache_.end() ||
                external_cache_entry->second.IsExpired()) {
                cache_.insert({
                    access_token, internal::ExpiringCacheMapEntry<V>(
                        std::shared_ptr<ConcurrentMap<std::string, V> >(
                            new ConcurrentMap<std::string, V>()),
                        cache_entry_lifetime_seconds)
                });
            }

            return cache_[access_token].cached_item();
        }

        void CheckCacheForExpiredTokens(double cache_cleanup_period_seconds) {
            auto lock = mutex_.Lock();

            const auto now = internal::CurrentTimePoint();
            if (now > (last_cache_cleanup_timestamp_ +
                       std::chrono::duration<double>(cache_cleanup_period_seconds))) {
                RemoveExpiredEntriesNoMutex();
                last_cache_cleanup_timestamp_ =
                        now + std::chrono::duration<double>(cache_cleanup_period_seconds);
            }
        }

        void RemoveExpiredEntriesFromCache() {
            auto lock = mutex_.Lock();

            RemoveExpiredEntriesNoMutex();
        }

        void Remove(const std::string &access_token) {
            auto lock = mutex_.Lock();
            cache_.erase(access_token);
        }

        void Clear() {
            auto lock = mutex_.Lock();
            cache_.clear();
        }

    private:
        void RemoveExpiredEntriesNoMutex() {
            for (auto it = cache_.begin(); it != cache_.end();) {
                if (it->second.IsExpired()) {
                    it = cache_.erase(it);
                } else {
                    ++it;
                }
            }
        }

        std::unordered_map<std::string, internal::ExpiringCacheMapEntry<V> > cache_;
        internal::TimePoint last_cache_cleanup_timestamp_;
        ::nebula::util::Mutex mutex_;
    };
} // namespace parquet::encryption
